package org.zjvis.datascience.service;

import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;
import org.zjvis.datascience.common.constant.DatabaseConstant;
import org.zjvis.datascience.common.constant.DatasetConstant;
import org.zjvis.datascience.common.dto.DatasetDTO;
import org.zjvis.datascience.common.dto.dataset.DatasetColumnDTO;
import org.zjvis.datascience.common.dto.dataset.DatasetJsonInfo;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.util.DatasetUtil;
import org.zjvis.datascience.common.util.JwtUtil;
import org.zjvis.datascience.common.util.RedisUtil;
import org.zjvis.datascience.common.util.SqlUtil;
import org.zjvis.datascience.common.util.db.CryptoUtil;
import org.zjvis.datascience.common.util.db.DataTypeValidator;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.dataset.HeadVO;
import org.zjvis.datascience.common.vo.dataset.PreviewDatasetVO;
import org.zjvis.datascience.common.vo.http.HttpDataCreateTableVO;
import org.zjvis.datascience.common.vo.http.HttpStatusVO;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.dataprovider.JdbcDataProvider;
import org.zjvis.datascience.service.dataset.DatasetService;
import org.zjvis.datascience.service.dataset.ImportDataService;
import org.zjvis.datascience.service.mapper.DatasetMapper;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Entity;

/**
 * @description Http数据 Service
 * @date 2021-12-06
 */
@Service
public class HttpDataService {

    @Autowired
    private DatasetService datasetService;

    @Autowired
    DatasetMapper datasetMapper;

    @Autowired
    RedisUtil redisUtil;

    @Autowired
    GPDataProvider gpDataProvider;

    @Autowired
    SftpConnectService sftpConnectService;

    @Autowired
    DatasetActionService datasetActionService;

    protected final static Logger logger = LoggerFactory.getLogger(JdbcDataProvider.class);

    private static final int BATCH_INSERT_SIZE = 3000;

    //greenplum数据库端口
    @Value("${greenplum.port}")
    private int gpPort;
    //连接greenplum数据库url
    @Value("${greenplum.url}")
    private String gpUrl;
    //连接greenplum数据库 用户名
    @Value("${greenplum.username}")
    private String gpUsername;
    //连接greenplum数据库 密码
    @Value("${greenplum.password}")
    private String gpPassword;
    @Value("${greenplum.master-host-name}")
    private String gpMasterHostName;
    @Value("${greenplum.gpload.version}")
    private String gpLoadVersion;
    @Value("${greenplum.gpload.local-host-name}")
    private String gpLoadLocalHostName;
    @Value("${greenplum.gpload.port-range}")
    private String gpLoadPortRange;
    @Value("${greenplum.gpload.database}")
    private String gpLoadDatabase;


    public JSONObject jsonParsing(String str) {
        JSONObject jo;
        JSONObject jsonObject;
        Set<Map.Entry<String, Object>> entries=new HashSet<>();
        if(str.startsWith("[")||str.endsWith("]")){
            JSONArray jsonArray = JSON.parseArray(str);
            try {
                for (Object o : jsonArray) {
                    jo=JSON.parseObject(o.toString());
                    entries.addAll(jo.entrySet());
                }
            }catch (Exception e) {
                jsonObject = new JSONObject();
                jsonObject.put("tips", e.toString());
                jsonObject.put("code", 702);
                return jsonObject;
            }
        }
        else{
            try {
                jo = JSON.parseObject(str);
            } catch (Exception e) {
                jsonObject = new JSONObject();
                jsonObject.put("tips", e.toString());
                jsonObject.put("code",702);
                return jsonObject;
            }
            entries.addAll(jo.entrySet());
        }
        List<String> keys = new ArrayList<>();
        Entity data = new Entity();
        for (Map.Entry<String, Object> entry : entries) {
            String key = entry.getKey();
            Object value = entry.getValue();
            keys.add(key);
            data.set(key, value);
        }
        List<HeadVO> head = PreviewDatasetVO.toHead(keys);

        List<Entity> values = DatasetUtil.recommendDataType(head, Collections.singletonList(data), DatasetConstant.DATA_PREVIEW_SIZE);

        PreviewDatasetVO previewHttpDatasetVO = PreviewDatasetVO.builder()
                .data(values)
                .head(head)
                .owner(JwtUtil.getCurrentUserDTO().getName())
                .build();
        jsonObject = new JSONObject();
        if (checkHead(previewHttpDatasetVO.getHead())) {
            jsonObject.put("tips", DatasetConstant.TIPS_SPECIAL_CHARACTER);
        }
        jsonObject.put("data", previewHttpDatasetVO);
        jsonObject.put("code",200);
        return jsonObject;
    }

    public Boolean updateHttpDataStatus(DatasetDTO datasetDTO,HttpStatusVO vo){
        datasetMapper.update(datasetDTO);
        DatasetDTO dto = datasetMapper.queryById(datasetDTO.getId());
        if(dto.getIncrementalDataConfig().equals(vo.getPreIncrementalDataConfig())){
            return false;
        }
        return true;
    }

    /**
     * 检查字段中是否有空格或特殊字符
     *
     * @param heads
     * @return
     */
    public boolean checkHead(List<HeadVO> heads) {
        for (HeadVO head : heads) {
            if (head.getName().trim().matches(DatasetConstant.SPECIAL_CHARACTER_REGEX)) {
                return true;
            }
        }
        return false;
    }

    public Long httpDataCreateTable(HttpDataCreateTableVO vo) {
        Long id = JwtUtil.getCurrentUserDTO().getId();
        Long res = 0L;
        String importType = vo.getImportType();

        String status="on";
        // 数据字段属性
        List<Object> data = vo.getData();
        // 数据字段
        List<DatasetColumnDTO> colTypes = vo.getColumnDTOList();

        //验证字段类型
        for(int i=0;i<data.size();i++){
            doValidate(data.get(i),colTypes.get(i));
        }

        //将加密的列在数据库中存为varchar类型
        for (DatasetColumnDTO colType : colTypes) {
            if(colType.getDataMaskingType()==null)continue;
            switch (colType.getDataMaskingType().toLowerCase()){
                case "md5":
                case "sha1":
                case "mosaic":
                    colType.setType("VARCHAR");
                default:
                    continue;
            }
        }

        List<DatasetColumnDTO> cols = new ArrayList<>();
        for (DatasetColumnDTO colType : colTypes) {
            if(colType.getImportColumn()){
                cols.add(colType);
            }
        }
        String targetTable = ImportDataService.generateGpTableName();
        sftpConnectService.createTable(targetTable, cols);
        String url = DigestUtils.md5DigestAsHex(targetTable.getBytes(StandardCharsets.UTF_8));

        DatasetJsonInfo dj = DatasetJsonInfo.builder()
                .schema(DatabaseConstant.GREEN_PLUM_DEFAULT_SCHEMA)
                .table(targetTable)
                .type(importType)
                .columnMessage(vo.getColumnDTOList())
                .build();
        //增加url和开启状态两个信息
        res = datasetService.saveDataset(id, vo.getCategoryId(), vo.getName(), dj, url, status);
        if(res>0){
            redisUtil.hset("urlToDataJson",url,dj.toString());
        }
        return res;
    }

    public void doValidate(Object record,DatasetColumnDTO field){
        StringBuilder errorData = new StringBuilder();
        try {
            if(record==null)return;
            DataTypeValidator.validateAndMatch(record.toString(), field.getType());
        } catch (DataScienceException e) {
            errorData.append(StrUtil.format("{}:{}  -  目标类型->{}",
                    field.getName(), record, field.getType()));
            throw DataScienceException.of(BaseErrorCode.DATASET_IMPORT_DATA_TRANSFORM_FAIL, null, errorData.toString());
        }
    }

    public int saveHttpDataToGP(String dataJson,List<Object> objects) {
        JSONObject jsonObject;
        // 数据
        List<Entity> data = null;
        try(Connection conn = gpDataProvider.getConn(DatabaseConstant.GREEN_PLUM_DATASET_ID)) {
            data = new LinkedList<>();
            DatasetJsonInfo dj = JSONObject.parseObject(dataJson,DatasetJsonInfo.class);
            for (Object object : objects) {
                try {
                    jsonObject = JSON.parseObject((String) object);
                }catch (Exception e){
                    continue;
                }
                // 每个数据必须加上数据库名
                // 每行处理
                Entity handled = handleRaw(jsonObject,DatabaseConstant.GREEN_PLUM_DEFAULT_SCHEMA + "." + dj.getTable(),dj.getColumnMessage());
                if(handled.size()!=0){
                    data.add(handled);
                }
            }
            JDBCUtil.doWriteToTable(data, gpDataProvider.getSource(DatabaseConstant.GREEN_PLUM_DATASET_ID));

        }catch (Exception e) {
            logger.error("error: saveHttpDataToGP "+e.toString());
        }
        if(data==null) return 0;
        return data.size();

    }

    public Entity handleRaw(JSONObject raw,String newTable,List<DatasetColumnDTO> columnMessages){
        Entity data = new Entity();
        // 每列处理
        for(int i=0;i < columnMessages.size(); i++){
            // 每列获取值
            Object columnValue = getColumnValue(raw, columnMessages.get(i));
            if(columnValue!=null){
                data.put(JDBCUtil.GP_WRAPPER.wrap(SqlUtil.formatFieldName(columnMessages.get(i).getName())),columnValue);
            }
        }
        if (StringUtils.isNoneBlank(newTable)) {
            data.setTableName(newTable);
        }
        data.setFieldNames(data.keySet());
        return data;
    }

    public Object getColumnValue(JSONObject jo, DatasetColumnDTO columnMessage){
        Object rawValue = null;
        try{
            doValidate(jo.get(columnMessage.getName()),columnMessage);
            String maskingType = columnMessage.getDataMaskingType();
            rawValue = jo.getString(columnMessage.getName());

            if (StringUtils.isNotBlank(maskingType)) {
                try {
                    switch (maskingType){
                        case "md5":
                            rawValue = CryptoUtil.hmacMD5(String.valueOf(rawValue));
                            break;
                        case "sha1":
                            rawValue = CryptoUtil.hmacSHA1(String.valueOf(rawValue));
                            break;
                        case "mosaic":
                            rawValue = CryptoUtil.stringMosaic(String.valueOf(rawValue));
                            break;
                        default :
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }catch (DataScienceException e){
            return null;
        }
        return rawValue;
    }

    public void importDataToGP(){
        List<DatasetDTO> dtos = datasetMapper.getAllUrlStatsOn();
        Map<String, Object> collect = dtos.stream().collect(Collectors.toMap(DatasetDTO::getDataConfig, DatasetDTO::getDataJson, (v1, v2) -> v1));
        redisUtil.hdelAll("urlToDataJson");
        redisUtil.hmset("urlToDataJson",collect);

        String prefix="urlForImportHttpData_";
        String lockey = "lockey";
        long expireTime = 300L;
        int pageSize = BATCH_INSERT_SIZE;

        Boolean lock=redisUtil.setnx(lockey, "true", expireTime);
        if (lock) {
            try{
                for (DatasetDTO dto : dtos) {

                    String url = dto.getDataConfig();
                    String dj = dto.getDataJson();
                    // 根据前缀+url值到redis中获取数据信息
                    int size= (int) redisUtil.lGetListSize(prefix+url);
                    // 页数
                    if(size<=0)continue;

                    int pages=(size+pageSize-1)/pageSize;
                    // 真实导入数据
                    Long rowAffect = 0L;

                    for(int i=0;i<pages;++i){
                        // 从redis中分页取出数据
                        List<Object> objects = redisUtil.lrPop(prefix + url, pageSize);
                        // 数据存入gp
                        rowAffect += (long)saveHttpDataToGP(dj, objects);
                    }

                    // 信息计入data_json
                    if (rowAffect > 0) {
                        datasetActionService.insertAction(rowAffect, dj, dto);
                        logger.info("URL: "+url+"  TABLE: "+ JSON.parseObject(dto.getDataJson()).getString("table") +"： Successfully Imported "+rowAffect+" records");
                    } else {
                        logger.info("Import Failed！ URL: "+url+"  TABLE: "+ JSON.parseObject(dto.getDataJson()).getString("table"));
                    }
                }
            }catch (Exception e){
                logger.error("error:Import Failed！ "+e.toString());
            }finally {
                redisUtil.del(lockey);
            }
        }
    }
}
